{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating Output Streams with StreamPipes Functions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A StreamPipes Function is a practical tool to help you receive and send live data from and to your StreamPipes instance. This tutorial provides sample code for creating a StreamPipes Function that defines an output stream so you can get started right away by following the below steps."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Connecting to your StreamPipes instance\n",
    "\n",
    "In order to receive and send live data from and to StreamPipes you will first have to connect to your instance by creating a StreamPipes Client by following the usual procedure. In this case we will be connecting to a local instance using Kafka. If you haven't already, you will need to open the kafka port manually inside the docker compose file by adding the following The following port mapping inside the `kafka` node:  \n",
    "\n",
    "```yml\n",
    "ports:\n",
    "  - \"9094:9094\"\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "os.environ[\"BROKER-HOST\"] = \"localhost\"\n",
    "os.environ[\"KAFKA-PORT\"] = \"9094\"\n",
    "api_key = os.environ.get(\"TOKEN\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-07-30 12:44:07,912 - streampipes.client.client - [INFO] - [client.py:199] [_set_up_logging] - Logging successfully initialized with logging level INFO.\n",
      "2024-07-30 12:44:07,995 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources.\n",
      "2024-07-30 12:44:07,995 - streampipes.client.client - [INFO] - [client.py:172] [_get_server_version] - The StreamPipes version was successfully retrieved from the backend: 0.95.1. By means of that, authentication via the provided credentials is also tested successfully.\n"
     ]
    }
   ],
   "source": [
    "from streampipes.client import StreamPipesClient\n",
    "from streampipes.client.config import StreamPipesClientConfig\n",
    "from streampipes.client.credential_provider import StreamPipesApiKeyCredentials\n",
    "\n",
    "client_config = StreamPipesClientConfig(\n",
    "    credential_provider=StreamPipesApiKeyCredentials(username=\"admin@streampipes.apache.org\", api_key=api_key),\n",
    "    host_address=\"localhost\",\n",
    "    port=80,\n",
    "    https_disabled=True,\n",
    ")\n",
    "client = StreamPipesClient(client_config=client_config)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating an output stream\n",
    "\n",
    "You can create an output stream by using the `create_data_stream` method inside the functions `init` method. You need to specify the name that will be used to display the stream in StreamPipes alongside its `stream_id`, ``attributes``, and the ``broker``."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def __init__(self, input_stream: DataStream):\n",
    "    output_stream = create_data_stream(\n",
    "                name=\"example-stream\",\n",
    "                stream_id=\"example-stream\",\n",
    "                attributes={\n",
    "                    \"number\": RuntimeType.INTEGER.value\n",
    "                },\n",
    "                broker=get_broker_description(input_stream)\n",
    "            )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can now add this ``output_stream`` to the function definition and pass it to the parent class."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "function_definition = FunctionDefinition(\n",
    "            consumed_streams=[input_stream.element_id]\n",
    "        ).add_output_data_stream(output_stream)\n",
    "\n",
    "super().__init__(function_definition=function_definition)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To create the output event we define a dictionary with the key being equvalent to the key we defined for the `attributes` in the `create_data_stream` method. The value will just be 1, for simplicity. Lastly we call the ``add_output`` method and use the above specified ``function_definition`` to get the ``stream_id``. All this is happening inside the `onEvent` method of the function because this method will be called for every incoming event."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "output = {\n",
    "    \"number\": 1\n",
    "}\n",
    "            \n",
    "self.add_output(\n",
    "    stream_id=self.function_definition.get_output_stream_ids()[0],\n",
    "    event=output\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating the StreamPipes Function\n",
    "\n",
    "The code below shows an example function and also demonstrates how to include the output stream creation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "from typing import Dict, Any\n",
    "from streampipes.functions.streampipes_function import StreamPipesFunction\n",
    "from streampipes.functions.utils.data_stream_generator import create_data_stream, RuntimeType\n",
    "from streampipes.functions.broker.broker_handler import get_broker_description\n",
    "from streampipes.model.resource import FunctionDefinition, DataStream\n",
    "from streampipes.functions.utils.function_context import FunctionContext\n",
    "from streampipes.functions.function_handler import FunctionHandler\n",
    "from streampipes.functions.registration import Registration\n",
    "\n",
    "class SimpleFunction(StreamPipesFunction):\n",
    "    def __init__(self, input_stream: DataStream):\n",
    "        output_stream = create_data_stream(\n",
    "            name=\"example-stream\",\n",
    "            stream_id=\"example-stream\",\n",
    "            attributes={\n",
    "                \"number\": RuntimeType.INTEGER.value\n",
    "            },\n",
    "            broker=get_broker_description(input_stream)\n",
    "        )\n",
    "        \n",
    "        function_definition = FunctionDefinition(\n",
    "            consumed_streams=[input_stream.element_id]\n",
    "        ).add_output_data_stream(output_stream)\n",
    "\n",
    "        super().__init__(function_definition=function_definition)\n",
    "\n",
    "    def onServiceStarted(self, context: FunctionContext):\n",
    "        pass\n",
    "\n",
    "    def onEvent(self, event: Dict[str, Any], streamId: str):\n",
    "        output = {\n",
    "            \"number\": 1\n",
    "        }\n",
    "            \n",
    "        self.add_output(\n",
    "            stream_id=self.function_definition.get_output_stream_ids()[0],\n",
    "            event=output\n",
    "        )\n",
    "\n",
    "    def onServiceStopped(self):\n",
    "        pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Instantiating and starting the function\n",
    "\n",
    "To instantiate the function we pass the required input stream, then we register the function using an instance of the class `Registration` after which we can use the function handler to initialize all registered functions using the client. In this case, our input stream is named 'demo'. You will need to replace this name with the name of the input stream you wish to use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-07-30 12:44:25,052 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources.\n",
      "2024-07-30 12:44:25,150 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources.\n",
      "2024-07-30 12:44:25,150 - streampipes.functions.function_handler - [INFO] - [function_handler.py:84] [initializeFunctions] - Using output data stream 'example-stream' for function '0e4287a7-6936-4fcb-9de4-8d29a7b6c1c6'\n",
      "2024-07-30 12:44:25,229 - streampipes.endpoint.endpoint - [INFO] - [endpoint.py:164] [_make_request] - Successfully retrieved all resources.\n",
      "2024-07-30 12:44:25,230 - streampipes.functions.function_handler - [INFO] - [function_handler.py:100] [initializeFunctions] - Using KafkaConsumer for SimpleFunction\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-07-30 12:44:25,247 - streampipes.functions.broker.kafka.kafka_publisher - [INFO] - [kafka_publisher.py:49] [_make_connection] - Connecting to Kafka at localhost:9094\n",
      "2024-07-30 12:44:25,250 - streampipes.functions.broker.kafka.kafka_consumer - [INFO] - [kafka_consumer.py:52] [_make_connection] - Connecting to Kafka at localhost:9094\n",
      "2024-07-30 12:44:25,251 - streampipes.functions.broker.kafka.kafka_consumer - [INFO] - [kafka_consumer.py:62] [_create_subscription] - Subscribing to stream: sp:spdatastream:PIgznU\n",
      "2024-07-30 12:46:12,229 - streampipes.functions.broker.kafka.kafka_publisher - [INFO] - [kafka_publisher.py:73] [disconnect] - Stopped connection to stream: example-stream\n"
     ]
    }
   ],
   "source": [
    "stream = [stream for stream in client.dataStreamApi.all() if stream.name == \"demo\"][0]\n",
    "\n",
    "simple_function = SimpleFunction(input_stream=stream)\n",
    "\n",
    "registration = Registration()\n",
    "registration.register(simple_function)\n",
    "\n",
    "function_handler = FunctionHandler(registration, client)\n",
    "function_handler.initializeFunctions()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If we now go into StreamPipes, create a new pipeline and view the info for our output stream we can see the live predictions coming in.  \n",
    "\n",
    "![](https://raw.githubusercontent.com/apache/streampipes/dev/streampipes-client-python/docs/img/tutorial-output-stream.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To stop the function we call the function handler's `disconnect` method:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-07-30 12:46:12,171 - streampipes.functions.broker.kafka.kafka_consumer - [INFO] - [kafka_consumer.py:72] [disconnect] - Stopped connection to stream: sp:spdatastream:PIgznU\n"
     ]
    }
   ],
   "source": [
    "function_handler.disconnect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In conclusion, this tutorial has successfully demonstrated how to generate output streams using a StreamPipes Function.  \n",
    "We hope you found this tutorial helpful and would appreciate your feedback. Please visit our GitHub discussion page to share your impressions. We promise to read and respond to every comment!\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": ".venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
